package com.meta.api.flink.aggregate;

import org.apache.flink.table.functions.AggregateFunction;

//布尔求和算子 - 聚合函数求true的个数
public class BooleanSummationFunctions {

    //算子名称
    public static final String BOOLSUM = "BOOLSUM";

    public static class ChooseHitAccum {
        public long hitNum = 0l;
    }

    /**
     * // register function
     * StreamTableEnvironment tEnv = ...
     * tEnv.registerFunction("BOOLSUM", new BooleanSummationAvg());
     * // use function
     * tEnv.sqlQuery("SELECT user, BOOLSUM(true) AS avgPoints FROM userScores GROUP BY user");
     */
    public static class BooleanSummationAvg extends AggregateFunction<Long, ChooseHitAccum> {

        @Override
        public ChooseHitAccum createAccumulator() {
            return new ChooseHitAccum();
        }

        @Override
        public Long getValue(ChooseHitAccum acc) {
            return acc.hitNum;
        }

        //积累
        public void accumulate(ChooseHitAccum acc, boolean hit) {
            if (hit) {
                acc.hitNum++;
            }
        }

    }
}
